Skip to content

feat(callcenter): wire LanceVersionWatcher + DrainTask scaffold (DM-4a/b/c, DM-6a/b)#255

Merged
AdaWorldAPI merged 3 commits into
mainfrom
claude/supabase-subscriber-wire-up
Apr 24, 2026
Merged

feat(callcenter): wire LanceVersionWatcher + DrainTask scaffold (DM-4a/b/c, DM-6a/b)#255
AdaWorldAPI merged 3 commits into
mainfrom
claude/supabase-subscriber-wire-up

Conversation

@AdaWorldAPI

Copy link
Copy Markdown
Owner

Summary

  • DM-4a/b/c: LanceVersionWatcher in version_watcher.rs (117 LOC, 4 tests); #[cfg(feature="realtime")] pub mod in lib.rs; LanceMembrane gains watcher field, project() calls bump(row), subscribe() returns watch::Receiver<CognitiveEventRow> — flips Subscription type from Phase-A mpsc::Receiver<u64> stub to live supabase-shape always-latest fan-out.
  • DM-6a/b: DrainTask scaffold in drain.rs (89 LOC, 2 tests), Future impl returning Poll::Pending; unconditional pub mod drain in lib.rs.
  • New test subscribe_receives_on_project (under [realtime]): calls project(), asserts rx.borrow().thinking matches the projected row.
  • Board hygiene: STATUS_BOARD.md DM-4/DM-6 → In PR; INTEGRATION_PLANS.md prepend entry; EPIPHANIES.md prepend finding.

Test plan

  • cargo test -p lance-graph-callcenter --lib — 13 pass (without realtime)
  • cargo test -p lance-graph-callcenter --lib --features realtime — 17 pass (4 new in version_watcher, 1 new subscribe_receives_on_project)
  • cargo check -p lance-graph-callcenter — clean (no callcenter warnings)
  • cargo check -p lance-graph-callcenter --features realtime — clean

Key constraint respected

Tokio was already an optional dep under [realtime] in Cargo.toml — no new dependencies added.

https://claude.ai/code/session_01SbYsmmbPf9YQuYbHZN52Zh


Generated by Claude Code

claude added 2 commits April 24, 2026 20:23
…a/b/c, DM-6a/b)

Flips LanceMembrane::subscribe() from Phase-A disconnected stub to a live
tokio::sync::watch::Receiver<CognitiveEventRow> behind the [realtime] feature
gate (supabase-shape always-latest fan-out). project() now calls
watcher.bump(row) so every subscriber observes the latest committed
CognitiveEventRow without polling.

Changes:
- crates/lance-graph-callcenter/src/version_watcher.rs — NEW (DM-4b): LanceVersionWatcher
  wrapping watch::Sender<CognitiveEventRow>; bump/subscribe/receiver_count; 4 tests.
- crates/lance-graph-callcenter/src/drain.rs — NEW (DM-6a): DrainTask scaffold,
  Future impl returning Poll::Pending; 2 tests.
- lib.rs — uncomment pub mod version_watcher (#[cfg(feature="realtime")]) and
  pub mod drain (unconditional) (DM-4c, DM-6b).
- lance_membrane.rs — add watcher: LanceVersionWatcher field; Subscription type is
  watch::Receiver<CognitiveEventRow> under [realtime] / mpsc::Receiver<u64> without;
  project() calls watcher.bump(row.clone()); subscribe() returns watcher.subscribe();
  new test subscribe_receives_on_project (flips subscribe_returns_disconnected_receiver
  under [realtime]).
- Board: STATUS_BOARD DM-4/DM-6 → In PR; INTEGRATION_PLANS prepend supabase-subscriber-v1;
  EPIPHANIES prepend finding entry.
- .claude/plans/supabase-subscriber-v1.md committed (plan file).

Tests: 13 pass (no realtime), 17 pass (with realtime). Zero regressions.

https://claude.ai/code/session_01SbYsmmbPf9YQuYbHZN52Zh

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ec3b5c7742

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +38 to +39
let (tx, _rx) = watch::channel(initial);
Self { tx }

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve watcher state without active subscribers

Dropping the initial receiver immediately means the channel can have zero receivers for long periods, and bump() then fails instead of updating the stored value. In practice, any project() that happens before the first subscriber (or between subscriber disconnect/reconnect windows) is lost, so new subscribers do not get the latest committed row even though this module/documentation describes always-latest behavior.

Useful? React with 👍 / 👎.

Comment on lines +217 to +218
fn subscribe(&self, _filter: CommitFilter) -> watch::Receiver<CognitiveEventRow> {
self.watcher.subscribe()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Honor CommitFilter in realtime subscribe path

The realtime subscribe implementation ignores the provided CommitFilter and always returns an unfiltered watcher receiver. Because this commit turns subscriptions into live event delivery, callers that request actor/free-energy/commit filtering now receive every row, which breaks the contract implied by subscribe(filter) and can leak unrelated events to subscribers that expected scoped data.

Useful? React with 👍 / 👎.

…riber-wire-up

# Conflicts:
#	.claude/board/AGENT_LOG.md
@AdaWorldAPI AdaWorldAPI merged commit 10744cc into main Apr 24, 2026
0 of 2 checks passed
@AdaWorldAPI AdaWorldAPI deleted the claude/supabase-subscriber-wire-up branch May 15, 2026 21:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants